"""
Case Type   : 发布订阅
Case Name   : 以binary形式发布订阅ustore二级分区表
Create At   : 2022-11-04
Owner       : opentestcase004
Description :
    1.创建发布端
    2.创建订阅端
    3.在两个集群创建表
    4.发布端插入表数据
    5.订阅端查询是否同步
    6.发布端修改表数据
    7.订阅端查询是否同步
    8.清理环境
Expect      :
    1.成功
    2.成功
    3.成功
    4.成功
    5.数据同步
    6.成功
    7.数据同步
    8.成功
History     : 
"""

import os
import unittest
from yat.test import macro
from yat.test import Node
from testcase.utils.Common import Common
from testcase.utils.CommonSH import CommonSH
from testcase.utils.Constant import Constant
from testcase.utils.Logger import Logger

Primary_SH = CommonSH('PrimaryDbUser')


@unittest.skipIf(3 != Primary_SH.get_node_num(), '非1+2环境不执行')
class Pubsubclass(unittest.TestCase):
    def setUp(self):
        self.log = Logger()
        self.log.info("-----------this is setup-----------")
        self.log.info(f"-----{os.path.basename(__file__)} start-----")
        self.pri_userdb_pub = Node(node='PrimaryDbUser')
        self.pri_userdb_sub = Node(node='remote1_PrimaryDbUser')
        self.constant = Constant()
        self.commsh_pub = CommonSH('PrimaryDbUser')
        self.commsh_sub = CommonSH('remote1_PrimaryDbUser')
        self.com_pub = Common()
        self.com_sub = Common('remote1_PrimaryDbUser')
        self.tb_name1 = 't_pub_sub_0193_1'
        self.tb_name2 = 't_pub_sub_0193_2'
        self.tb_name3 = 't_pub_sub_0193_3'
        self.subname = "s_pub_sub_0193"
        self.pubname = "p_pub_sub_0193"
        self.parent_path_pub = os.path.dirname(macro.DB_INSTANCE_PATH)
        self.parent_path_sub = os.path.dirname(macro.DB_INSTANCE_PATH_REMOTE1)
        self.port = str(int(self.pri_userdb_pub.db_port) + 1)
        self.wal_level = self.com_pub.show_param("wal_level")
        self.enable_default_ustore_table = \
            self.com_pub.show_param("enable_default_ustore_table")
        self.enable_default_ustore_table1 = \
            self.com_sub.show_param("enable_default_ustore_table",
                                    macro.DB_ENV_PATH_REMOTE1)
        self.user_param_pub = f'-U {self.pri_userdb_pub.db_user} ' \
            f'-W {self.pri_userdb_pub.db_password}'
        self.user_param_sub = f'-U {self.pri_userdb_sub.db_user} ' \
            f'-W {self.pri_userdb_sub.db_password}'
        cmd = f"cp " \
            f"{os.path.join(macro.DB_INSTANCE_PATH, 'pg_hba.conf')} " \
            f"{os.path.join(self.parent_path_pub, 'pg_hba.conf')};"
        self.log.info(cmd)
        result = self.pri_userdb_pub.sh(cmd).result()
        self.log.info(result)
        cmd = f"cp " \
            f"{os.path.join(macro.DB_INSTANCE_PATH_REMOTE1, 'pg_hba.conf')}" \
            f" {os.path.join(self.parent_path_sub, 'pg_hba.conf')};"
        self.log.info(cmd)
        result = self.pri_userdb_sub.sh(cmd).result()
        self.log.info(result)
        self.hostname = self.pri_userdb_sub.sh('hostname').result().strip()
        text = "--step:预置条件,修改pg_hba expect:成功"
        self.log.info(text)
        guc_res = self.commsh_pub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG, '',
            'all', False, False, '',
            f'host    replication  {self.pri_userdb_sub.db_user} '
            f'{self.pri_userdb_sub.db_host}/32 sha256')
        self.log.info(guc_res)
        self.assertTrue(guc_res, '执行失败' + text)
        guc_res = self.commsh_pub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG, '',
            'all', False, False, '',
            f'host    all  {self.pri_userdb_pub.db_user} '
            f'{self.pri_userdb_pub.db_host}/32 sha256')
        self.log.info(guc_res)
        self.assertTrue(guc_res, '执行失败' + text)
        result = self.commsh_pub.execute_gsguc(
            'set', self.constant.GSGUC_SUCCESS_MSG, 'wal_level=logical')
        self.assertTrue(result, '执行失败' + text)
        result = self.commsh_pub.restart_db_cluster(True)
        flg = self.constant.START_SUCCESS_MSG in result or 'Degrade' in result
        self.assertTrue(flg, '执行失败' + text)
        guc_res = self.commsh_sub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG, '',
            'all', False, False, macro.DB_INSTANCE_PATH_REMOTE1,
            f'host    replication  {self.pri_userdb_pub.db_user} '
            f'{self.pri_userdb_pub.db_host}/32 sha256',
            macro.DB_ENV_PATH_REMOTE1)
        self.log.info(guc_res)
        self.assertTrue(guc_res, '执行失败' + text)
        guc_res = self.commsh_sub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG, '',
            'all', False, False, macro.DB_INSTANCE_PATH_REMOTE1,
            f'host all  {self.pri_userdb_sub.db_user} '
            f'{self.pri_userdb_sub.db_host}/32 sha256',
            macro.DB_ENV_PATH_REMOTE1)
        self.log.info(guc_res)
        self.assertTrue(guc_res, '执行失败' + text)
        pub_result = self.commsh_pub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG,
            'enable_default_ustore_table=on')
        self.log.info(pub_result)
        self.assertTrue(pub_result, '执行失败' + text)
        sub_result = self.commsh_sub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG,
            "enable_default_ustore_table=on",
            dn_path=macro.DB_INSTANCE_PATH_REMOTE1,
            env_path=macro.DB_ENV_PATH_REMOTE1)
        self.log.info(sub_result)
        self.assertTrue(sub_result, '执行失败' + text)

    def test_pub_sub(self):
        text = "--step1:创建发布端 expect:成功--"
        self.log.info(text)
        sql_cmd = f"drop publication if exists {self.pubname};" \
            f"create publication {self.pubname} for all tables;"
        pub_result = self.commsh_pub.execut_db_sql(
            sql_cmd, sql_type=self.user_param_pub)
        self.log.info(pub_result)
        self.assertIn(self.constant.drop_pub_succ_msg, pub_result,
                      '执行失败' + text)
        self.assertIn(self.constant.create_pub_succ_msg, pub_result,
                      '执行失败' + text)

        text = "--step2:创建订阅指定为二进制模式 expect:成功--"
        self.log.info(text)
        result = self.commsh_sub.execute_generate(
            macro.COMMON_PASSWD, env_path=macro.DB_ENV_PATH_REMOTE1)
        self.assertIn(self.constant.create_keycipher_success, result,
                      '执行失败' + text)
        sql_cmd = f"drop subscription if exists {self.subname};" \
            f"create subscription {self.subname} connection " \
            f"'host={self.pri_userdb_pub.db_host} " \
            f"port={self.port} user={self.pri_userdb_pub.db_user} " \
            f"dbname={self.pri_userdb_pub.db_name} " \
            f"password={self.pri_userdb_pub.ssh_password}' publication " \
            f"{self.pubname} with (binary=True);"
        sub_result = self.commsh_sub.execut_db_sql(
            sql_cmd, self.user_param_sub, None,
            macro.DB_ENV_PATH_REMOTE1)
        self.log.info(sub_result)
        self.assertIn(self.constant.drop_sub_succ_msg, sub_result,
                      '执行失败' + text)
        self.assertIn(self.constant.create_sub_succ_msg, sub_result, '执行失败'
                      + text)

        text = "--step3:在两个集群创建表 expect:成功--"
        self.log.info(text)
        sql_cmd = f"drop table if exists {self.tb_name1},{self.tb_name2}," \
            f"{self.tb_name3};" \
            f"create table {self.tb_name1}(c_int int primary key,c_text " \
            f"text);create table {self.tb_name2}(c_int int primary key " \
            f"references {self.tb_name1}(c_int),c_text text);" \
            f"create table {self.tb_name3}(month_code varchar2(30) not null " \
            f"primary key,dept_code varchar2(30) not null,user_no " \
            f"varchar2(30) not null,sales_amt int) " \
            f"with (storage_type=ustore) " \
            f"partition by list (month_code) subpartition by list(dept_code)" \
            f"(" \
            f"      partition p1 values ('201902')" \
            f"(     subpartition p1_a values ('1')," \
            f"      subpartition p1_b values (default)" \
            f")," \
            f"      partition p2 values ('201903')" \
            f"(     subpartition p2_a values ('1')," \
            f"      subpartition p2_b values ('2')" \
            f")" \
            f");"
        pub_result = self.commsh_pub.execut_db_sql(
            sql_cmd, sql_type=self.user_param_pub)
        self.log.info(pub_result)
        self.assertEqual(pub_result.count(self.constant.DROP_TABLE_SUCCESS),
                         1, '执行失败' + text)
        self.assertEqual(pub_result.count(self.constant.TABLE_CREATE_SUCCESS),
                         6, '执行失败' + text)
        sub_result = self.commsh_sub.execut_db_sql(
            sql_cmd, self.user_param_sub, None, macro.DB_ENV_PATH_REMOTE1)
        self.log.info(sub_result)
        self.assertEqual(sub_result.count(self.constant.DROP_TABLE_SUCCESS),
                         1, '执行失败' + text)
        self.assertEqual(sub_result.count(self.constant.TABLE_CREATE_SUCCESS),
                         6, '执行失败' + text)

        text = "--step4:发布端向表插入数据 expect:成功--"
        self.log.info(text)
        sql_cmd = f"insert into {self.tb_name1} values(1,'first');" \
            f"insert into {self.tb_name2} values(1,'first');" \
            f"insert into {self.tb_name3} values('201902','first','1',1);"
        pub_result = self.commsh_pub.execut_db_sql(
            sql_cmd, sql_type=self.user_param_pub)
        self.log.info(pub_result)
        self.assertEqual(pub_result.count(self.constant.INSERT_SUCCESS_MSG), 3,
                         '执行失败' + text)

        text = "--step5:订阅端查询是否同步 expect:数据同步--"
        self.log.info(text)
        sql_select = f"alter subscription {self.subname} refresh publication;" \
            f"select pg_sleep(5.5);select * from {self.tb_name1};" \
            f"select * from {self.tb_name2};select * from {self.tb_name3};"
        sub_result = self.commsh_sub.execut_db_sql(
            sql_select, self.user_param_sub, None, macro.DB_ENV_PATH_REMOTE1)
        self.log.info(sub_result)
        self.assertEqual(sub_result.count('1 | first'), 2, '执行失败' + text)
        self.assertIn("201902     | first     | 1       |         1",
                      sub_result, '执行失败' + text)

        text = "--step6:发布端修改表数据 expect:成功--"
        self.log.info(text)
        sql_cmd = f"delete from {self.tb_name2};update {self.tb_name1} set " \
            f"c_text='new';update {self.tb_name3} set dept_code='2';"
        pub_result = self.commsh_pub.execut_db_sql(
            sql_cmd, sql_type=self.user_param_pub)
        self.log.info(pub_result)
        self.assertIn(self.constant.DELETE_SUCCESS_MSG, pub_result, '执行失败'
                      + text)
        self.assertEqual(pub_result.count(self.constant.UPDATE_SUCCESS_MSG), 2,
                         '执行失败' + text)

        text = "--step7:订阅端查询是否同步 expect:数据同步--"
        self.log.info(text)
        sql_select = \
            f"select pg_sleep(5.5);select * from {self.tb_name1};" \
            f"select count(*) from {self.tb_name2};" \
            f"select * from {self.tb_name3};"
        sub_result = self.commsh_sub.execut_db_sql(
            sql_select, self.user_param_sub, None, macro.DB_ENV_PATH_REMOTE1)
        self.log.info(sub_result)
        self.assertEqual(str(sub_result.splitlines()[7]).strip(), '1 | new',
                         '执行失败' + text)
        self.assertEqual(str(sub_result.splitlines()[12]).strip(), '0',
                         '执行失败' + text)
        self.assertIn('201902     | 2         | 1       |         1',
                      sub_result.splitlines()[-2], '执行失败' + text)

    def tearDown(self):
        text = "--step8:清理环境 expect:成功--"
        self.log.info(text)
        sql_cmd = f"drop subscription if exists {self.subname};"
        drop_sub_result = self.commsh_sub.execut_db_sql(
            sql_cmd, self.user_param_sub, None, macro.DB_ENV_PATH_REMOTE1)
        self.log.info(drop_sub_result)
        sql_cmd = f"drop publication if exists {self.pubname};"
        drop_pub_result = self.commsh_pub.execut_db_sql(
            sql_cmd, sql_type=self.user_param_pub)
        self.log.info(drop_pub_result)
        sql_cmd = f"drop table if exists {self.tb_name2};" \
            f"drop table if exists {self.tb_name1};" \
            f"drop table if exists {self.tb_name3};"
        result_sub = \
            self.commsh_sub.execut_db_sql(sql_cmd, self.user_param_sub, None,
                                          macro.DB_ENV_PATH_REMOTE1)
        self.log.info(result_sub)
        result_pub = self.commsh_pub.execut_db_sql(
            sql_cmd, sql_type=self.user_param_pub)
        self.log.info(result_pub)
        cmd = f"mv " \
            f"{os.path.join(self.parent_path_pub, 'pg_hba.conf')} " \
            f"{os.path.join(macro.DB_INSTANCE_PATH, 'pg_hba.conf')} "
        self.log.info(cmd)
        sh_result = self.pri_userdb_pub.sh(cmd).result()
        self.log.info(sh_result)
        cmd = f"mv " \
            f"{os.path.join(self.parent_path_sub, 'pg_hba.conf')} " \
            f"{os.path.join(macro.DB_INSTANCE_PATH_REMOTE1, 'pg_hba.conf')} "
        self.log.info(cmd)
        sh_result1 = self.pri_userdb_sub.sh(cmd).result()
        self.log.info(sh_result1)
        result1 = self.commsh_pub.execute_gsguc(
            'set', self.constant.GSGUC_SUCCESS_MSG,
            f'wal_level={self.wal_level}')
        self.log.info(result1)
        result2 = self.commsh_pub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG,
            f'enable_default_ustore_table={self.enable_default_ustore_table}')
        self.log.info(result2)
        result3 = self.commsh_sub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG,
            f"enable_default_ustore_table={self.enable_default_ustore_table1}",
            dn_path=macro.DB_INSTANCE_PATH_REMOTE1,
            env_path=macro.DB_ENV_PATH_REMOTE1)
        self.log.info(result3)
        pub_result = self.commsh_pub.restart_db_cluster(True)
        sub_result = self.commsh_sub.restart_db_cluster(
            True, macro.DB_ENV_PATH_REMOTE1)
        self.assertTrue(pub_result, '执行失败' + text)
        self.assertTrue(sub_result, '执行失败' + text)
        self.assertTrue(result1, '执行失败' + text)
        self.assertTrue(result2, '执行失败' + text)
        self.assertTrue(result3, '执行失败' + text)
        self.assertEqual("", sh_result, '执行失败' + text)
        self.assertEqual("", sh_result1, '执行失败' + text)
        self.assertIn(self.constant.drop_pub_succ_msg, drop_pub_result,
                      '执行失败' + text)
        self.assertIn(self.constant.drop_sub_succ_msg, drop_sub_result,
                      '执行失败' + text)
        self.assertEqual(result_sub.count(self.constant.DROP_TABLE_SUCCESS), 3,
                         '执行失败' + text)
        self.assertEqual(result_pub.count(self.constant.DROP_TABLE_SUCCESS), 3,
                         '执行失败' + text)
        self.log.info(f"-----{os.path.basename(__file__)} end-----")

